软件设计讲解
2025-07-15
软件框架
框架设计图

业务系统启动流程

代码讲解
AI 唤醒&人声检测
class Application(object):
def on_keyword_spotting(self, state):
logger.info("on_keyword_spotting: {}".format(state))
if state == 0:
# 唤醒词触发
if self.__working_thread is not None and self.__working_thread.is_running():
return
self.__working_thread = Thread(target=self.__working_thread_handler)
self.__working_thread.start()
self.__keyword_spotting_event.clear()
else:
self.__keyword_spotting_event.set()
def on_voice_activity_detection(self, state):
gc.collect()
logger.info("on_voice_activity_detection: {}".format(state))
if state == 1:
self.__voice_activity_event.set() # 有人声
else:
self.__voice_activity_event.clear() # 无人声
AI 初始化
初始化 AI 对象以及其他硬件驱动。
class Application(object):
def __init__(self):
# 初始化唤醒按键
self.talk_key = ExtInt(ExtInt.GPIO19, ExtInt.IRQ_RISING_FALLING, ExtInt.PULL_PU, self.on_talk_key_click, 50)
# 初始化 led; write(1) 灭; write(0) 亮
self.wifi_red_led = Led(33)
self.wifi_green_led = Led(32)
self.power_red_led = Led(39)
self.power_green_led = Led(38)
self.lte_red_led = Led(23)
self.lte_green_led = Led(24)
self.led_power_pin = Pin(Pin.GPIO27, Pin.OUT, Pin.PULL_DISABLE, 0)
# 初始化充电管理
self.charge_manager = ChargeManager()
# 初始化音频管理
self.audio_manager = AudioManager()
self.audio_manager.set_kws_cb(self.on_keyword_spotting)
self.audio_manager.set_vad_cb(self.on_voice_activity_detection)
# 初始化网络管理
self.net_manager = NetManager()
# 初始化任务调度器
self.task_manager = TaskManager()
# 初始化协议
self.__protocol = WebSocketClient()
self.__protocol.set_callback(
audio_message_handler=self.on_audio_message,
json_message_handler=self.on_json_message
)
self.__working_thread = None
self.__record_thread = None
self.__record_thread_stop_event = Event()
self.__voice_activity_event = Event()
self.__keyword_spotting_event = Event()
AI 对话中断逻辑
class Application(object):
def __chat_process(self):
self.start_vad()
try:
with self.__protocol:
self.power_red_led.on()
self.__protocol.hello()
self.__protocol.wakeword_detected("小智")
is_listen_flag = False
while True:
data = self.audio_manager.opus_read()
if self.__voice_activity_event.is_set():
# 有人声
if not is_listen_flag:
self.__protocol.listen("start")
is_listen_flag = True
self.__protocol.send(data)
# logger.debug("send opus data to server")
else:
if is_listen_flag:
self.__protocol.listen("stop")
is_listen_flag = False
if not self.__protocol.is_state_ok():
break
# logger.debug("read opus data length: {}".format(len(data)))
except Exception as e:
logger.debug("working thread handler got Exception: {}".format(repr(e)))
finally:
self.power_red_led.blink(250, 250)
self.stop_vad()
音频管理
统一管理设备的音频输入输出、编解码、语音识别相关功能(关键词识别 KWS 和语音活动检测 VAD),并提供回调接口供上层应用使用。
class AudioManager(object):
def __init__(self, channel=0, volume=11, pa_number=29):
self.aud = audio.Audio(channel) # 初始化音频播放通道
self.aud.set_pa(pa_number)
self.aud.setVolume(volume) # 设置音量
self.aud.setCallback(self.audio_cb)
self.rec = audio.Record(channel)
self.__skip = 0
# ========== 音频文件 ====================
def audio_cb(self, event):
if event == 0:
# logger.info('audio play start.')
pass
elif event == 7:
# logger.info('audio play finish.')
pass
else:
pass
def play(self, file):
self.aud.play(0, 1, file)
# ========= opus ====================
def open_opus(self):
self.pcm = audio.Audio.PCM(0, 1, 16000, 2, 1, 15) # 5 -> 25
self.opus = Opus(self.pcm, 0, 6000) # 6000 ~ 128000
def close_opus(self):
self.opus.close()
self.pcm.close()
del self.opus
del self.pcm
def opus_read(self):
return self.opus.read(60)
def opus_write(self, data):
return self.opus.write(data)
# ========= vad & kws ====================
def set_kws_cb(self, cb):
self.rec.ovkws_set_callback(cb)
def set_vad_cb(self, cb):
def wrapper(state):
if self.__skip != 2:
self.__skip += 1
return
return cb(state)
self.rec.vad_set_callback(wrapper)
def end_cb(self, para):
if(para[0] == "stream"):
if(para[2] == 1):
pass
elif (para[2] == 3):
pass
else:
pass
else:
pass
def start_kws(self):
self.rec.ovkws_start("_xiao_zhi_xiao_zhi", 0.7)
def stop_kws(self):
self.rec.ovkws_stop()
def start_vad(self):
self.__skip = 0
self.rec.vad_start()
def stop_vad(self):
self.rec.vad_stop()